/**
 * Copyright (c) 2015, 玛雅牛［李飞］ (lifei@wellbole.com).
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.jfinal.plugin.zbus;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.client.Broker;
import org.zstacks.zbus.client.Consumer;
import org.zstacks.zbus.client.Producer;
import org.zstacks.zbus.client.ZbusException;
import org.zstacks.zbus.client.broker.HaBroker;
import org.zstacks.zbus.client.broker.HaBrokerConfig;
import org.zstacks.zbus.client.broker.SingleBroker;
import org.zstacks.zbus.client.broker.SingleBrokerConfig;
import org.zstacks.zbus.protocol.MessageMode;
import org.zstacks.znet.callback.MessageCallback;

import com.jfinal.plugin.IPlugin;
import com.jfinal.plugin.zbus.callback.TMessageCallback;
import com.jfinal.plugin.zbus.coder.Coder;
import com.jfinal.plugin.zbus.coder.JsonCoder;

/**
 * @ClassName: ZbusPlugin
 * @Description: JFinal的Zbus插件实现
 * @author 李飞 (lifei@wellbole.com)
 * @date 2015年7月29日 下午12:46:32
 * @since V1.0.0
 */
public class ZbusPlugin implements IPlugin {
	/**
	 * 日志
	 */
	private static final Logger LOG = LoggerFactory.getLogger(ZbusPlugin.class);

	/**
	 * MQ生产者配置
	 */
	private final List<String> mqNameList = new ArrayList<String>();

	/**
	 * Topic生产者配置Map,一个mq可以对应多个topic
	 */
	private final Map<String, List<String>> mqNameTopicListMap = new HashMap<String, List<String>>();

	/**
	 * MQ消费者配置Map
	 */
	private final Map<String, MessageCallback> mqNameMessageCallbackMap = new HashMap<String, MessageCallback>();

	/**
	 * Topic消费者配置Map mp - topic - MessageCallback
	 */
	private final Map<String, Map<String, MessageCallback>> mqNamePubSubMessageCallbackMap = new HashMap<String, Map<String, MessageCallback>>();

	/**
	 * 消费者列表
	 */
	private final List<Consumer> consumerList = new ArrayList<Consumer>();
	
	/**
	 * 设定编码解码器为默认的json实现的编码解码器
	 */
	private Coder coder = new JsonCoder();

	/**
	 * zbusServer地址
	 */
	private final String brokerAddress;

	/**
	 * broker对象
	 */
	private Broker broker = null;

	/**
	 * 高可用Broker配置
	 */
	private final HaBrokerConfig haBrokerConfig;

	/**
	 * 简单Broker配置
	 */
	private final SingleBrokerConfig singleBrokerConfig;

	/**
	 * 默认构造函数，使用127.0.0.1:15555地址
	 */
	public ZbusPlugin() {
		this("127.0.0.1:15555");
	}

	/**
	 * 默认构造函数,可指定broker地址
	 */
	public ZbusPlugin(String brokerAddress) {
		this.haBrokerConfig = null;
		this.brokerAddress = brokerAddress;
		this.singleBrokerConfig = new SingleBrokerConfig();
		this.singleBrokerConfig.setBrokerAddress(brokerAddress);
	}

	/**
	 * 默认构造函数，使用HaBrokerConfig config构建
	 */
	public ZbusPlugin(HaBrokerConfig config) {
		this.haBrokerConfig = config;
		// 对HA模式，brokerAddress不起作用, 使用trackAddrList
		this.brokerAddress = config.getTrackAddrList();
		this.singleBrokerConfig = null;
	}

	/**
	 * 构造函数 使用SingleBrokerConfig config构建
	 */
	public ZbusPlugin(SingleBrokerConfig config) {
		this.haBrokerConfig = null;
		this.singleBrokerConfig = config;
		this.brokerAddress = config.getBrokerAddress();
	}


	/**  
	 * @param coder  编码解码器  
	 * @since V1.0.0
	 */
	public final void setCoder(Coder coder) {
		this.coder = coder;
	}

	/**
	 * @Title: createMq
	 * @Description: 创建一个MQ队列
	 * @param mq
	 *            MQ队列名
	 * @since V1.0.0
	 */
	public void createMq(String mq) {
		if (mqNameList.contains(mq)) {
			String log = "(mp=" + mq + ") is exists,Can not regist!";
			LOG.error(log);
			throw new ZbusException(log);
		}
		mqNameList.add(mq);
	}

	/**
	 * @Title: createTopic
	 * @Description: 创建一个基于PubSub（发布订阅）的Topic
	 * @param mq
	 *            MQ队列名
	 * @param topic
	 *            主题名
	 * @since V1.0.0
	 */
	public void createTopic(String mq, String topic) {
		// 依据mq获得 topic列表
		List<String> topicList = this.mqNameTopicListMap.get(mq);
		if (null == topicList) {
			topicList = new ArrayList<String>();
		}
		if (topicList.contains(topic)) {
			String log = "(mp=" + mq + ",topic=" + topic + ")" + " is exists,Can not regist!";
			LOG.error(log);
			throw new ZbusException(log);
		}
		topicList.add(topic);
		this.mqNameTopicListMap.put(mq, topicList);
	}

	/**
	 * @Title: registerMqMessageCallback
	 * @Description: 注册Mq的消息回调接口
	 * @param mq
	 *            MQ名
	 * @param cb
	 *            消息到达回调接口
	 * @since V1.0.0
	 */
	public void registerMqMessageCallback(String mq, TMessageCallback<?> cb) {
		if (mqNameMessageCallbackMap.containsKey(mq)) {
			LOG.warn(mq + "is exists!");
		}
		mqNameMessageCallbackMap.put(mq, cb);
	}

	/**
	 * @Title: registerTopicMessageCallback
	 * @Description: 注册Topic的消息回调接口
	 * @param mq
	 *            MQ名
	 * @param topic
	 *            主题名
	 * @param cb
	 *            消息到达回调接口
	 * @since V1.0.0
	 */
	public void registerTopicMessageCallback(String mq, String topic, TMessageCallback<?> cb) {
		// 依据mq获得 topic－MessageCallback映射map
		Map<String, MessageCallback> tmc = this.mqNamePubSubMessageCallbackMap.get(mq);
		if (null == tmc) {
			tmc = new HashMap<String, MessageCallback>();
		}
		tmc.put(topic, cb);
		this.mqNamePubSubMessageCallbackMap.put(mq, tmc);
	}

	@Override
	public boolean start() {
		ConcurrentHashMap<String, Producer> producerMap = new ConcurrentHashMap<String, Producer>();
		try {
			if (this.singleBrokerConfig != null) {
				broker = new SingleBroker(this.singleBrokerConfig);
			} else {
				broker = new HaBroker(this.haBrokerConfig);
			}
			LOG.info("create broker successfully (brokerAddress=" + this.brokerAddress + ")");
			// 创建MQ生产者
			for (String mq : this.mqNameList) {
				Producer producer = new Producer(broker, mq, MessageMode.MQ);
				producer.createMQ(); // 如果已经确定存在，不需要创建
				producerMap.put(mq, producer);
				LOG.info("create mq Producer successfully (mq=" + mq + ")");
			}
			// 创建Topic生产者
			for (Entry<String, List<String>> mqConfig : this.mqNameTopicListMap.entrySet()) {
				String mq = mqConfig.getKey();
				Producer producer = new Producer(broker, mq, MessageMode.PubSub);
				producer.createMQ(); // 如果已经确定存在，不需要创建
				List<String> topList = mqConfig.getValue();
				for (String topic : topList) {
					String key = "_mq_" + mq + "_topic_" + topic;
					producerMap.put(key, producer);
					LOG.info("create topic Producer successfully (mp=" + mq + ",topic=" + topic + ")");
				}
			}

			// 创建Mq消费者
			for (Entry<String, MessageCallback> entry : this.mqNameMessageCallbackMap.entrySet()) {
				String mq = entry.getKey();
				Consumer c = new Consumer(broker, mq, MessageMode.MQ);
				c.onMessage(entry.getValue());
				consumerList.add(c);
				LOG.info("create mq Consumer successfully (mq=" + mq + ")");
			}
			// 创建topic消费者
			// Map<String, Map<String, MessageCallback>>
			// mqNamePubSubMessageCallbackMap
			for (Entry<String, Map<String, MessageCallback>> mqConfig : this.mqNamePubSubMessageCallbackMap
					.entrySet()) {
				String mq = mqConfig.getKey();
				// topic <－> MessageCallback 映射map
				Map<String, MessageCallback> tmt = mqConfig.getValue();
				for (Entry<String, MessageCallback> topicConfig : tmt.entrySet()) {
					String topic = topicConfig.getKey();
					Consumer c = new Consumer(broker, mq, MessageMode.PubSub);
					c.setTopic(topic);
					c.onMessage(topicConfig.getValue());
					consumerList.add(c);
					LOG.info("create topic Consumer successfully (mq=" + mq + ",topic=" + topic + ")");
				}
			}

			// 初始化Zbus对象
			Zbus.init(producerMap, coder);
			return true;
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
			throw new ZbusException(e.getMessage(), e);
		}
	}

	@Override
	public boolean stop() {
		try {
			// 关闭消费者
			for (Consumer c : consumerList) {
				c.close();
			}
			// 关闭broker
			if (this.broker != null) {
				this.broker.close();
			}
			return true;
		} catch (IOException e) {
			LOG.error(e.getMessage(), e);
			return false;
		}
	}
}
